AddIns and Scripting

Roslyn .NET Scripting is an easy way to write custom functionality with C# scripts that are compiled during startup.

You can either write inline scripts in Foopipes.yml or load .csx scripts from file or an url.

Addins

Addins are scripts that are loaded, compiled and run at startup. Typically an addin register new service types and tasks which then are available for the pipelines.

addins:
  - url: "https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx"
  - script: |
      PipelineTask("nospaces").Json((context, json, ct) =>
         { 
           json.Data["name"] = json.Data["name"].Replace(" ", "");
           return json;
         });

services:
  mailgun:
    type: mailgun
    apiBaseUrl: https://api.mailgun.net/v3/sandbox5ded26xxxxxxxxxxxxb8.mailgun.org
    apiKey: key-3a56bxxxxxxxxxxxxxxx5c
    defaultFrom: me@mydomain.com

pipelines:
  - 
    when: 
      - queue: started
    from:
      - http: "https://jsonplaceholder.typicode.com/posts"
    do:
      - nospaces
    to:
      - log
    error:
      - { mailgun.send, to: me@mydomain.com, subject: Error, text: An error occured }

Community Addins

Addins created by the community is available as a public repository on Github: https://github.com/AreteraAB/Foopipes.Addins.

When loading a community Addin, use this url format: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx.

Consider using a tag or commit hash instead of the latest version: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/47e1e6d74f2546673b74f929f9ebf74ca56afae5/Tail/Tail.csx.

Pull requests are welcome!

Registering tasks

Register custom tasks by passing a callback to the method PipelineTask(string name).

Pipeline tasks can be json and/or binary or dynamic, depending what kind of data they're able to handle.

PipelineTask("mytask").Json(async (context, json, cancellationToken) => 
    {
        // Do something here with json data
        return json;
    });

PipelineTask("mytask").Binary(async (context, data, cancellationToken) =>
    {
        // Do something here with binary data
        return data;
    });

Registering services

Custom services is the best way to keep state. You can either register a service type which later can be referenced in the configuration file, or a service instance which will be a named singleton.

using Foopipes.Abstractions.Services;

class MyService : ServiceBase
{
    public string MyConfigValue => Config["myConfigValue"];
    private int _counter = 0;

    public int IncrementCounter()
    {
        return Interlocked.Increment(ref _counter);
    }
}

Service.Register("myserviceType", typeof(MyService));

Create an instance and configure your service in the configuration file:

services:
  myserviceInstanceName:
    type: myserviceType
    myConfigValue: hello

In your tasks, you can get hold of a service instance like this:

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        var service = await context.GetService<MyService>(defaultName: "myserviceInstanceName");
        json.Data["counter"] = service.IncrementCounter();
        return json;
    });

Observer/observable pattern

If your service implements IObservableService you can emit events that triggers pipelines. Very powerful combined with IRunnableService and System.Reactive.

#r "System.Reactive"

using System.Reactive.Subjects;
using Foopipes.Abstractions.Services;

class MyObservableService : ServiceBase, IObservableService, IRunnableService
{
    private Subject<ServiceEvent> _subject = new Subject<ServiceEvent>();

    // IObservableService 
    public IObservable<ServiceEvent> Observable => _subject;

    // IRunnableService
    public async Task Run(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(1000, cancellationToken);

            var metadata = JObject.FromObject(new
            {
                currentTime = DateTime.Now,
            });

            var serviceEvent = new ServiceEvent(this, metadata, new[] { new BinaryData(new byte[]{ 0x42} }));
            _subject.OnNext(serviceEvent);
        }
    }
}

Service.Register("myObservableService", typeof(MyObservableService));
services: 
  myObservableService: 
    type: myObservableService

pipelines: 
  - 
    when: 
      - myObservableService
    to:
      - log 

Invoking tasks from a task

You can write tasks that invoke other tasks.

PipelineTask("sendGreeting").Json(async (context, json, cancellationToken) =>
    {
        var data = JObject.FromObject(new
        {
            greeting = "hello " + await context.GetExpandedConfigValue("name")
        });

        var config = new Dictionary<string, string>
            {
                {"url", "https://www.myservice.com/api" },
                {"method", "post"},
                {"body", "formUrlEncoded"}
            };

        var r = await context.RunTask("http").WithData(data).WithArguments(config).Invoke(cancellationToken);
        return json;
    });

Invoke with:

  do:
    - { sendGreeting, name: "Foo #{lastname}" }

Returning results

Task callbacks can return json and/or binary data.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        return new JsonData( JObject.FromObject(new { hello="world"}) );
    });
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        return new BinaryData(new byte[]{0x42});
    });

It's also possible to return multiple results.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        return new ProcessJsonResult( new[]{ 
            JObject.FromObject(new { hello="world1" }),
            JObject.FromObject(new { hello="world2" }),
        });
    });

Referencing other assemblies

You can reference assemblies using the #r syntax.

#r "System.Security.Cryptography.Csp"

using System.Security.Cryptography;

var _aes = Aes.Create();

PipelineTask("decryptstring").Binary(async (context, binary, cancellationToken) =>
    {
        using (var decryptor = _aes.CreateDecryptor(key, iv))
        {
            // etc etc
        }
        return JObject.FromObject(new { value=decryptedData });
    });

Currently it is not possible to reference Nuget assembles.

Data binding

Use context.BindValue(string bindingExpression) to obtain a value using the data binding functionality.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        var myvalue = await context.BindValue("#{elasticsearch:myvalue}");
        json.Data["boundValue"] = myvalue;
        return json;
    });

Similary, use context.SetValue(string key, string value) to set a value.

PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
    {
        await context.SetValue("elasticsearch:myvalue", "hello world");
        return json;
    });

Class Reference

Addin host globals:

{
    ITaskBuilder PipelineTask(string name);
    IServiceBuilder Service { get; }
}
interface ITaskBuilder
{
    /************ Async Json ************/
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<IProcessResult>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JsonData>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject[]>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<BinaryData>> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<byte[]>> func);

    /************ Non async Json ************/
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, IProcessResult> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JsonData> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject[]> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, byte[]> func);
    public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, BinaryData> func);

    /************ async Binary************/
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<IProcessResult>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JsonData>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject[]>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<BinaryData>> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<byte[]>> func);

    /************ Non async Binary************/
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, IProcessResult> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JsonData> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject[]> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, byte[]> func);
    public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, BinaryData> func);

    /************ Non async Dynamic ************/
    public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, dynamic> func);

    /************ async Dynamic ************/
    public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, Task<dynamic>> func);

    public ITaskBuilder WithDefaultConfigKey(string defaultConfigKey);
}

interface IServiceBuilder
{
    public IServiceBuilder Instance(string name, IService instance);
    public IServiceBuilder Register(string typeName, Type serviceType);
}
interface IScriptTaskContext
{
    IDictionary<string, string> Config { get; }
    ILogger Logger { get; }
    ILoggerFactory LoggerFactory { get; }
    IPipelineContext PipelineContext { get; }
    IServiceProvider ServiceProvider { get; }

    Task<string> BindValue(string bindingExpression);
    Task<string> GetExpandedConfigValue(string key, bool throwIfNotSet = true);
    IService GetService(string name);
    IRunTaskBuilder RunTask(string name);
    Task SetValue(string key, string val);
}
public static class ScriptTaskContextExtensions
{
    public static async Task<TService> GetService<TService>(this IScriptTaskContext context, 
        string defaultName, 
        string configKeyName = "service",
        bool throwIfNotFound = true) where TService : class;
}
public class JsonData : IProcessResultData
{
    public JsonData(JObject jsonData, JObject metadata=null);

    public JObject Metadata { get; }
    public JObject Data { get; }

    public static JsonData Empty { get; }
}

public class BinaryData : IProcessResultData
{
    public BinaryData(byte[] binaryData, JObject metadata = null);

    public JObject Metadata { get; }
    public byte[] Data { get; }
    public static BinaryData Empty { get; }
}